package com.atguigu.flink.datastreamapi;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/10
 */
public class Demo3_CustomPartitioner
{
    public static void main(String[] args) {

     Configuration conf = new Configuration();
     conf.setInteger("rest.port", 3333);
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

     env.setParallelism(3);


        env
            .socketTextStream("hadoop102", 8888)
            /*
                partitionCustom(
            Partitioner<K> partitioner,   分区器，根据key决定数据发送到下游的哪个通道
            KeySelector<T, K> keySelector  key的选择器，上游发送的数据，挑选什么作为key
            )
             */
            .partitionCustom(
                new Partitioner<String>()
                {
                    /*
                        int numPartitions: 下游算子的并行度，目前是3
                        返回值:
                                0 --- (numPartitions - 1)

                         100以下的数据，分发到0号Task
                         100-1000以下的数据，分发到1号Task
                         剩下的数据，分发到2号Task
                     */
                    @Override
                    public int partition(String key, int numPartitions) {
                        if (numPartitions >=3){
                            int intKey = Integer.parseInt(key);
                            if (intKey < 100){
                                return 0;
                            }else if (intKey < 1000){
                                return 1;
                            }else {
                                return numPartitions - 1;
                            }
                        }else {
                            //全局汇总
                            return 0;
                        }
                    }
                }
                ,
                num -> num
            )
            .print().name("myprint");


       try {
           env.execute();
       } catch (Exception e) {
           e.printStackTrace();
       }



    }
}
